package com.kaigejava.rocketmq.maindemo.consumer.shunxu;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * @author 凯哥Java
 * @description 顺序消息的消费者
 * @company
 * @since 2022/10/19 8:47
 */
public class ShunXuConsumer {

    public static void main(String[] args) throws Exception {
        //：创建消费者consumer,指定消费者组名
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("group1");
        //2：指定nameserver地址
        pushConsumer.setNamesrvAddr("192.168.50.132:9876");
        //3：订阅主题Topic和Tag
        pushConsumer.subscribe("shunxu-topic","");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动，那么按照上次消费的位置继续消费
         */
        pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //4：设置回调函数，处理消息.顺序消息：MessageListenerOrderly 这个构造器
        pushConsumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : list) {
                    String str = new String(msg.getBody());
                    System.out.println("线程名称:【"+Thread.currentThread().getName()+"】 \t 顺序消息为:"+str);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5：启动消费组consumer
        pushConsumer.start();
        System.out.println("顺序消费者启动了");

    }
}
